/**
 * Copyright (c) 2015, 玛雅牛［李飞］ (lifei@wellbole.com).
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.jfinal.plugin.zbus;

import com.jfinal.log.Logger;
import com.jfinal.plugin.IPlugin;
import org.zbus.broker.BrokerConfig;

/**
 * @ClassName: ZbusPlugin
 * @Description: JFinal的Zbus插件实现
 * @author 李飞 (lifei@wellbole.com)
 * @date 2015年7月29日 下午12:46:32
 * @since V1.0.0
 */
public class ZbusPlugin implements IPlugin {

	/**
	 * 日志
	 */
	private static final Logger LOG = Logger.getLogger("ZbusPlugin");

	private ZbusAdaptor adaptor = new ZbusAdaptor();

	/**
	 * MQ消费者配置Map
	 */
//	private final Map<String, TMsgHandler<?>> mqNameTMsgHandlerMap = new HashMap<String, TMsgHandler<?>>();

	/**
	 * Topic消费者配置Map mp - topic - TMsgHandler
	 */
//	private final Map<String, Map<String, TMsgHandler<?>>> mqNamePubSubTMsgHandlerMap = new HashMap<String, Map<String, TMsgHandler<?>>>();

	/**
	 * 消费者列表
	 */
//	private final List<Consumer> consumerList = new ArrayList<Consumer>();

	/**
	 * 发送器列表
	 */
//	private final static List<Sender<?>> senderList =  new ArrayList<Sender<?>>();

	/**
	 * zbusServer地址
	 */
	private String brokerAddress = null;

    public ZbusPlugin brokerAddress(String brokerAddress){
        if(brokerAddress != null && !brokerAddress.isEmpty()) {
            this.brokerAddress = brokerAddress;
        } else {
            this.brokerAddress = "127.0.0.1:15555";
        }
        return this;
    }

	/**
	 * broker对象
	 */
//	private static Broker broker = null;

	/**
	 * 简单Broker配置
	 */
//	private BrokerConfig brokerConfig = null;

	private String scanRootPackage;

    public ZbusPlugin scanPackage(String scanRootPackage){
        if (scanRootPackage != null && !scanRootPackage.isEmpty()){
            this.scanRootPackage = scanRootPackage;
        }
        return this;
    }

    private Integer maxConsumerNum = 1;

    public ZbusPlugin maxConsumerNum(Integer consumerNum){
        this.maxConsumerNum = (consumerNum != null && consumerNum > 0)?consumerNum:1;
        return this;
    }

    private Boolean hasGlobalMaxNum = false;

    public ZbusPlugin hasGlobalMaxNum(Boolean useMaxNum){
        this.hasGlobalMaxNum = (useMaxNum != null)?useMaxNum:false;
        return this;
    }

    public String getBrokerAddress(){
        return this.brokerAddress;
    }

    public String getScanRootPackage(){
        return this.scanRootPackage;
    }

    public Integer getMaxConsumerNum(){
        return this.maxConsumerNum;
    }

    public ZbusAdaptor adaptor(){
        return this.adaptor;
    }

	//原构造流程由于将创建对象,创建broker,加载注解绑定到了一起
    //因此注册监听这类流程无法嵌入.
    //这里需要将这些流程拆分开.
	public ZbusPlugin(String brokerAddress, String scanRootPackage,
                      Boolean hasGlobalMaxNum,Integer consumerNum) {
        init(brokerAddress,scanRootPackage,hasGlobalMaxNum,consumerNum);
		//this.brokerAddress = brokerAddress;
		//this.scanRootPackage = scanRootPackage;
		//创建broker
		//ensureBroker();
		//加载注解
		//autoLoadByAnnotation();
	}

    /**
     * 默认构造函数,可指定brokerAddress和consumer个数,不设置扫描路径
     */
    public ZbusPlugin(String brokerAddress, Boolean hasGlobalMaxNum, Integer consumerNum) {
        this(brokerAddress, null, hasGlobalMaxNum, consumerNum);
    }

    /**
     * 默认构造函数，使用127.0.0.1:15555地址,不设置扫描路径
     */
    public ZbusPlugin(Boolean hasGlobalMaxNum,Integer consumerNum) {
        this("127.0.0.1:15555",hasGlobalMaxNum,consumerNum);
    }

    /**
     * 默认构造函数，使用127.0.0.1:15555地址,不设置扫描路径,只启动一个consumer
     */
    public ZbusPlugin() {
        this(false,1);
    }

//	/**
//	 * 构造函数 使用BrokerConfig config构建
//	 */
//	public ZbusPlugin(BrokerConfig config) {
//		this(config, null);
//	}

//	public ZbusPlugin(BrokerConfig config, String scanRootPackage) {
//		this.brokerConfig = config;
//		this.scanRootPackage = scanRootPackage;
//		//创建broker
//		ensureBroker();
//		//加载注解
//		autoLoadByAnnotation();
//	}

    public ZbusPlugin init(String brokerAddress, String scanRootPackage,
                           Boolean hasGlobalMaxNum,Integer consumerNum){
        brokerAddress(brokerAddress).scanPackage(scanRootPackage)
                .hasGlobalMaxNum(hasGlobalMaxNum).maxConsumerNum(consumerNum);
        //load();
        return this;
    }


    public ZbusPlugin broker(BrokerConfig config){
        adaptor.broker.doing(brokerAddress,config);
        return this;
    }

    public ZbusPlugin broker(){
        return broker(null);
    }

    public ZbusPlugin loadAnno(){
        adaptor.autoLoadByAnnotation(this.scanRootPackage);//加载注解
        return this;
    }

    public ZbusPlugin load(){
        //创建broker
        //ensureBroker();
        //加载注解
        //autoLoadByAnnotation();
        broker().loadAnno();
        return this;
    }

//    /**
//     * @Title: createProducer
//     * @Description: 创建一个Producer
//     * @return Producer
//     * @throws InterruptedException
//     * @throws IOException
//     * @since V1.0.0
//     */
//    public static Producer createProducer(Broker broker, Sender<?> sender, String mq, MqMode mqMode) throws IOException, InterruptedException{
//        Producer producer = new Producer(broker, mq, mqMode);
//        producer.createMQ();
//        //这册这个发送器
//        senderList.add(sender);
//        return producer;
//    }

//    public static Producer createProducer(Sender<?> sender, String mq, MqMode mqMode) throws IOException, InterruptedException{
//        return createProducer(broker, sender, mq, mqMode);
//    }

//    /**
//	 * @Title: registerMqMsgHandler
//	 * @Description: 注册Mq的消息回调接口
//	 * @param mq
//	 *            MQ名
//	 * @param msgHandler
//	 *            消息到达回调接口
//	 * @since V1.0.0
//	 */
//	public void registerMqMsgHandler(String mq, TMsgHandler<?> msgHandler) {
//		if (mqNameTMsgHandlerMap.containsKey(mq)) {
//			LOG.warn("(mq=" + mq + ")对应的消息处理器已存在!");
//		}
//		mqNameTMsgHandlerMap.put(mq, msgHandler);
//	}

//	/**
//	 * @Title: registerTopicMsgHandler
//	 * @Description: 注册Topic的消息回调接口
//	 * @param topic
//	 *            Topic对象
//	 * @param msgHandler
//	 *            消息到达回调接口
//	 * @since V1.0.0
//	 */
//	public void registerTopicMsgHandler(Topic topic, TMsgHandler<?> msgHandler) {
//		this.registerTopicMsgHandler(topic.getMqName(), topic.getTopicName(), msgHandler);
//	}

//	/**
//	 * @Title: registerTopicMsgHandler
//	 * @Description: 注册Topic的消息回调接口
//	 * @param mq
//	 *            MQ名
//	 * @param topic
//	 *            主题名
//	 * @param msgHandler
//	 *            消息到达回调接口
//	 * @since V1.0.0
//	 */
//	public void registerTopicMsgHandler(String mq, String topic, TMsgHandler<?> msgHandler) {
//		// 依据mq获得 topic－TMsgHandler映射map
//		Map<String, TMsgHandler<?>> tmc = this.mqNamePubSubTMsgHandlerMap.get(mq);
//		if (null == tmc) {
//			tmc = new HashMap<String, TMsgHandler<?>>();
//		}
//		if(tmc.containsKey(topic)){
//			LOG.warn("(mq=" + mq + ",topic=" + topic + ")对应的消息处理器已存在!");
//		}
//		tmc.put(topic, msgHandler);
//		this.mqNamePubSubTMsgHandlerMap.put(mq, tmc);
//	}

	/**
	 * @Title: ensureBroker
	 * @Description: 确保broker可用
	 * @throws Exception
	 * @since V1.0.0
	 */
//	public ZbusPlugin ensureBroker(){
//		if(broker == null){
//    		synchronized (this) {
//				if(broker == null){
//					if(this.brokerAddress != null){
//						this.brokerConfig = new BrokerConfig();
//						this.brokerConfig.setBrokerAddress(brokerAddress);
//					}
//					try {
//						broker = new SingleBroker(this.brokerConfig);
//						LOG.info("创建broker成功(brokerAddress=" + this.brokerAddress + ")");
//					} catch (IOException e) {
//						throw new RuntimeException(e.getMessage(),e);
//					}
//				}
//			}
//    	}
//    	return this;
//	}

//	@Override
//	public boolean start() {
//		try {
//			//确保创建
//			ensureBroker();
//			// 创建Mq消费者
//			for (Entry<String, TMsgHandler<?>> entry : this.mqNameTMsgHandlerMap.entrySet()) {
//				String mq = entry.getKey();
//				Consumer c = new Consumer(broker, mq, MqMode.MQ);
//				c.onMessage(entry.getValue());
//				c.start();
//				consumerList.add(c);
//				LOG.info("创建MQ消费者成功(mq=" + mq + ")");
//			}
//			// 创建topic消费者
//			for (Entry<String, Map<String, TMsgHandler<?>>> mqConfig :
//			        this.mqNamePubSubTMsgHandlerMap.entrySet()) {
//				String mq = mqConfig.getKey();
//				// topic <－> TMsgHandler 映射map
//				Map<String, TMsgHandler<?>> tmt = mqConfig.getValue();
//				for (Entry<String, TMsgHandler<?>> topicConfig : tmt.entrySet()) {
//					String topic = topicConfig.getKey();
//					Consumer c = new Consumer(broker, mq, MqMode.PubSub);
//					c.setTopic(topic);
//					c.onMessage(topicConfig.getValue());
//					c.start();
//					consumerList.add(c);
//					LOG.info("创建Topic消费者成功 (mq=" + mq + ",topic=" + topic + ")");
//				}
//			}
//			return true;
//		} catch (Exception e) {
//			LOG.error(e.getMessage(), e);
//			throw new RuntimeException(e.getMessage(), e);
//		}
//	}

    @Override
    public boolean start() {
        try {
            //确保创建
            adaptor.broker.create();
            // 创建Mq消费者
            //createQConsumer(tMsgHandlerQMap);
            adaptor.consumer.doingAllQ(adaptor.broker.me,hasGlobalMaxNum,maxConsumerNum);
            // 创建topic消费者
            //createTConsumer(tMsgHandlerTMap);
            adaptor.consumer.doingAllT(adaptor.broker.me,hasGlobalMaxNum,maxConsumerNum);
            return true;
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            throw new RuntimeException(e.getMessage(), e);
        }
    }

//	@Override
//	public boolean stop() {
//		try {
//			// 关闭消费者
//			for (Consumer c : consumerList) {
//				c.close();
//				c = null;
//			}
//			consumerList.clear();
//
//			//关闭所有发送器
//			for(Sender<?> sender : senderList){
//				sender.close();
//			}
//			senderList.clear();
//
//			// 关闭broker
//			if (broker != null) {
//				broker.close();
//				broker = null;
//			}
//			return true;
//		} catch (IOException e) {
//			LOG.error(e.getMessage(), e);
//			return false;
//		}
//	}

    @Override
    public boolean stop() {
        adaptor.zbus.consumer.close();
        adaptor.zbus.producer.close();
        adaptor.zbus.broker.close();
        return true;
    }

//	public ZbusPlugin autoLoadByAnnotation(){
//
//		if(!StrKit.isBlank(this.scanRootPackage)){
//            try {
//                //自动扫描相关类库检查
//                Class.forName("org.reflections.Reflections");
//                Class.forName("com.google.common.collect.Sets");
//                Class.forName("javassist.bytecode.annotation.Annotation");
//            } catch (Exception e) {
//                throw new RuntimeException("Zbus开启自动扫描加载消息处理器需要Reflections、Guava、Javassist类库，请导入相应的jar包");
//            }
//
//            try { //检查rootPackage的有效性
//                Class.forName(this.scanRootPackage);
//            } catch (Exception e) {
//                throw new RuntimeException("Zbus开启自动扫描时不能填写无效的空包地址");
//            }
//
//            //首先检查是否具有相关的库
//            Reflections reflections = new Reflections(this.scanRootPackage);
//            Set<Class<?>> mqHandlerClasses = reflections.getTypesAnnotatedWith(MqHandler.class);
//            for (Class<?> mc : mqHandlerClasses) {
//                if(!TMsgHandler.class.isAssignableFrom(mc)){
//                    throw new RuntimeException(mc.getName() + " 必须继承自 TMsgHandler<T>");
//                }
//                MqHandler mh = mc.getAnnotation(MqHandler.class);
//                try {
//                    if(mh.enable()) {
//                        TMsgHandler<?> hander = (TMsgHandler<?>) mc.newInstance();
//                        this.registerMqMsgHandler(mh.value(), hander);
//                        LOG.info("通过注解自动加载MQ消息处理器( mq=" + mh.value()  + ",handler=" + mc.getName() + " )");
//                    }
//                } catch (Exception e) {
//                    throw new RuntimeException(e.getMessage(),e);
//                }
//            }
//
//            Set<Class<?>> topicHandlerClasses = reflections.getTypesAnnotatedWith(TopicHandler.class);
//            for (Class<?> mc : topicHandlerClasses) {
//                if(!TMsgHandler.class.isAssignableFrom(mc)){
//                    throw new RuntimeException(mc.getName() + " 必须继承自 TMsgHandler<T>");
//                }
//                TopicHandler th = mc.getAnnotation(TopicHandler.class);
//                try {
//                    if(th.enable()) {
//                        TMsgHandler<?> hander = (TMsgHandler<?>) mc.newInstance();
//                        this.registerTopicMsgHandler(th.mq(), th.topic(), hander);
//                        LOG.info("通过注解自动加载Topic消息处理器( mq=" + th.mq()  + ",topic=" + th.topic() + ",handler=" + mc.getName() + " )");
//                    }
//                } catch (Exception e) {
//                    throw new RuntimeException(e.getMessage(),e);
//                }
//            }
//		}
//        return this;
//	}
}
